package com.learn.eshdoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

import java.io.IOException;

/**
 * 读取 HDFS 上的内容然后写入 Elasticsearch
 */
public class HdfsToEs {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		conf.setBoolean("mapred.map.tasks.speculative.execution", false);
		conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
		conf.set("es.nodes", "10.10.139.42");
		conf.set("es.port", "9200");
		conf.set("es.nodes.wan.only", "true");
		conf.set("es.resource", "blog/_doc");
		conf.set("es.mapping.id", "id");
		conf.set("es.input.json", "yes");
		Job job = Job.getInstance(conf, "EmrToES");
		job.setJarByClass(HdfsToEs.class);
		job.setMapperClass(MyMapper.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(EsOutputFormat.class);
		job.setMapOutputKeyClass(NullWritable.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

	public static class MyMapper extends Mapper<Object, Text, NullWritable, Text> {
		private Text line = new Text();

		@Override
		protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			if (value.getLength() > 0) {
				line.set(value);
				context.write(NullWritable.get(), line);
			}
		}
	}
}
